{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "![](https://europe-west1-atp-views-tracker.cloudfunctions.net/working-analytics?notebook=tutorials--fastapi-agent--fastapi-agent-tutorial)\n",
    "\n",
    "# Serving an Agent with FastAPI"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## What is FastAPI and Why Use It for Agents?\n",
    "\n",
    "FastAPI is a modern, high-performance web framework for building APIs with Python. Released in 2018, it has quickly gained popularity due to its combination of speed, ease of use, and developer-friendly features.\n",
    "\n",
    "At its core, FastAPI is designed to create REST APIs that can serve requests efficiently while providing robust validation and documentation. For AI agent deployment, FastAPI offers several critical advantages:\n",
    "\n",
    "- **Asynchronous Support**: AI agents often need to handle concurrent requests efficiently. FastAPI's native async/await support enables handling thousands of simultaneous connections, perfect for serving multiple agent requests in parallel without blocking.\n",
    "\n",
    "- **Streaming Responses**: Agents frequently generate content incrementally (token by token). FastAPI's streaming response capabilities allow for real-time transmission of agent outputs as they're generated, creating a more responsive user experience.\n",
    "\n",
    "- **Type Validation**: When working with agents, ensuring proper input formats is crucial. FastAPI uses Pydantic for automatic request validation, catching malformed inputs before they reach your agent and providing clear error messages.\n",
    "\n",
    "- **Performance**: Built on Starlette and Uvicorn, FastAPI offers near-native performance. For compute-intensive agent applications, this means your infrastructure handles API overhead efficiently, allowing more resources for the actual agent processing.\n",
    "\n",
    "- **Automatic Documentation**: When exposing an agent API to multiple users or teams, documentation becomes essential. FastAPI automatically generates interactive API documentation via Swagger UI and ReDoc, making it easy for others to understand and use your agent.\n",
    "\n",
    "- **Schema Enforcement**: Pydantic models ensure that both requests to your agent and responses from it conform to predefined schemas, making agent behavior more predictable and easier to integrate with other systems.\n",
    "\n",
    "In this tutorial, we'll build a complete API that serves an AI agent with both synchronous and streaming endpoints, demonstrating how FastAPI's features address the specific challenges of deploying agents in production."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Prerequisites\n",
    "\n",
    "Before we begin, let's install the necessary packages:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install fastapi uvicorn pydantic"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "If you plan to use the streaming functionality, also install:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!pip install sse-starlette"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Agent Quick Recap\n",
    "\n",
    "Let's start by defining a simple agent that we'll expose via our API. This could be any agent implementation, but for this tutorial, we'll create a basic example that simulates an AI agent responding to user queries:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Agent FastAPI Agent received: 'Hello, what can you do?'\n",
      "Response: This is a simulated agent response.\n"
     ]
    }
   ],
   "source": [
    "class SimpleAgent:\n",
    "    def __init__(self, name=\"FastAPI Agent\"):\n",
    "        self.name = name\n",
    "    \n",
    "    def generate_response(self, query):\n",
    "        \"\"\"Generate a synchronous response to a user query\"\"\"\n",
    "        return f\"Agent {self.name} received: '{query}'\\nResponse: This is a simulated agent response.\"\n",
    "    \n",
    "    async def generate_response_stream(self, query):\n",
    "        \"\"\"Generate a streaming response to a user query\"\"\"\n",
    "        import asyncio\n",
    "        \n",
    "        prefix = f\"Agent {self.name} thinking about: '{query}'\\n\"\n",
    "        response = \"This is a simulated agent response that streams token by token.\"\n",
    "        \n",
    "        # Yield the prefix as a single chunk\n",
    "        yield prefix\n",
    "        \n",
    "        # Stream the response token by token with small delays\n",
    "        for token in response.split():\n",
    "            await asyncio.sleep(0.1)  # Simulate thinking time\n",
    "            yield token + \" \"\n",
    "\n",
    "# Test our agent directly\n",
    "agent = SimpleAgent()\n",
    "test_query = \"Hello, what can you do?\"\n",
    "print(agent.generate_response(test_query))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This simple agent can generate both synchronous responses and streaming responses. In practice, you might replace this with a more sophisticated agent like a fine-tuned LLM, an RAG system, or any other AI agent."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Minimal FastAPI App\n",
    "\n",
    "Now, let's create a minimal FastAPI application with a health check endpoint:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "from fastapi import FastAPI\n",
    "\n",
    "# Initialize FastAPI app\n",
    "app = FastAPI(\n",
    "    title=\"Agent API\",\n",
    "    description=\"A simple API that serves an AI agent\",\n",
    "    version=\"0.1.0\"\n",
    ")\n",
    "\n",
    "# Create an instance of our agent\n",
    "agent = SimpleAgent()\n",
    "\n",
    "# Health check endpoint\n",
    "@app.get(\"/health\")\n",
    "def health_check():\n",
    "    \"\"\"Check if the API is running\"\"\"\n",
    "    return {\"status\": \"ok\", \"message\": \"API is operational\"}"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This creates a basic FastAPI application with metadata and a health check endpoint. The health check is a simple way to verify that your API is running correctly."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## POST /agent - Synchronous Endpoint\n",
    "\n",
    "Now, let's create a synchronous endpoint for our agent. We'll use Pydantic models to define the request and response structures:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pydantic import BaseModel\n",
    "from typing import Optional\n",
    "\n",
    "# Define request and response models\n",
    "class QueryRequest(BaseModel):\n",
    "    query: str\n",
    "    context: Optional[str] = None\n",
    "    \n",
    "    class Config:\n",
    "        schema_extra = {\n",
    "            \"example\": {\n",
    "                \"query\": \"What is FastAPI?\",\n",
    "                \"context\": \"I'm a beginner programmer.\"\n",
    "            }\n",
    "        }\n",
    "\n",
    "class QueryResponse(BaseModel):\n",
    "    response: str\n",
    "    \n",
    "    class Config:\n",
    "        schema_extra = {\n",
    "            \"example\": {\n",
    "                \"response\": \"FastAPI is a modern, high-performance web framework for building APIs with Python.\"\n",
    "            }\n",
    "        }\n",
    "\n",
    "# Create a synchronous endpoint for the agent\n",
    "@app.post(\"/agent\", response_model=QueryResponse)\n",
    "def query_agent(request: QueryRequest):\n",
    "    \"\"\"Get a response from the agent\"\"\"\n",
    "    response = agent.generate_response(request.query)\n",
    "    return QueryResponse(response=response)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This endpoint accepts POST requests with a JSON body containing a \"query\" field and an optional \"context\" field. It returns a JSON response with the agent's answer."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## POST /agent/stream - Token Streaming\n",
    "\n",
    "For many AI applications, token streaming provides a better user experience. Let's implement a streaming endpoint:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "from fastapi.responses import StreamingResponse\n",
    "import json\n",
    "\n",
    "@app.post(\"/agent/stream\")\n",
    "async def stream_agent(request: QueryRequest):\n",
    "    \"\"\"Stream a response from the agent token by token\"\"\"\n",
    "    \n",
    "    async def event_generator():\n",
    "        async for token in agent.generate_response_stream(request.query):\n",
    "            # Format as a JSON object\n",
    "            data = json.dumps({\"token\": token})\n",
    "            yield f\"data: {data}\\n\\n\"\n",
    "\n",
    "    return StreamingResponse(\n",
    "        event_generator(),\n",
    "        media_type=\"text/event-stream\"\n",
    "    )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This endpoint streams the agent's response token by token using Server-Sent Events (SSE). The client can process these tokens incrementally as they arrive, enabling a more interactive experience.\n",
    "\n",
    "For a more sophisticated implementation, you might want to use the `sse-starlette` package:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "from sse_starlette.sse import EventSourceResponse\n",
    "\n",
    "@app.post(\"/agent/stream-sse\")\n",
    "async def stream_agent_sse(request: QueryRequest):\n",
    "    \"\"\"Stream a response using SSE with the sse-starlette package\"\"\"\n",
    "    \n",
    "    async def event_generator():\n",
    "        async for token in agent.generate_response_stream(request.query):\n",
    "            yield {\"data\": json.dumps({\"token\": token})}\n",
    "    \n",
    "    return EventSourceResponse(event_generator())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This provides a more robust implementation of Server-Sent Events."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Creating the Full Application\n",
    "\n",
    "Now, let's put everything together into a complete FastAPI application. Create a file named `fastapi_agent.py` in your `scripts` directory:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "from fastapi import FastAPI, Depends, HTTPException, Header\n",
    "from fastapi.responses import StreamingResponse\n",
    "from pydantic import BaseModel\n",
    "from typing import Optional\n",
    "import json\n",
    "import os\n",
    "import asyncio\n",
    "\n",
    "# Define our simple agent class\n",
    "class SimpleAgent:\n",
    "    def __init__(self, name=\"FastAPI Agent\"):\n",
    "        self.name = name\n",
    "    \n",
    "    def generate_response(self, query):\n",
    "        \"\"\"Generate a synchronous response to a user query\"\"\"\n",
    "        return f\"Agent {self.name} received: '{query}'\\nResponse: This is a simulated agent response.\"\n",
    "    \n",
    "    async def generate_response_stream(self, query):\n",
    "        \"\"\"Generate a streaming response to a user query\"\"\"\n",
    "        prefix = f\"Agent {self.name} thinking about: '{query}'\\n\"\n",
    "        response = \"This is a simulated agent response that streams token by token.\"\n",
    "        \n",
    "        # Yield the prefix as a single chunk\n",
    "        yield prefix\n",
    "        \n",
    "        # Stream the response token by token with small delays\n",
    "        for token in response.split():\n",
    "            await asyncio.sleep(0.1)  # Simulate thinking time\n",
    "            yield token + \" \"\n",
    "\n",
    "# Define request and response models\n",
    "class QueryRequest(BaseModel):\n",
    "    query: str\n",
    "    context: Optional[str] = None\n",
    "    \n",
    "    class Config:\n",
    "        schema_extra = {\n",
    "            \"example\": {\n",
    "                \"query\": \"What is FastAPI?\",\n",
    "                \"context\": \"I'm a beginner programmer.\"\n",
    "            }\n",
    "        }\n",
    "\n",
    "class QueryResponse(BaseModel):\n",
    "    response: str\n",
    "    \n",
    "    class Config:\n",
    "        schema_extra = {\n",
    "            \"example\": {\n",
    "                \"response\": \"FastAPI is a modern, high-performance web framework for building APIs with Python.\"\n",
    "            }\n",
    "        }\n",
    "\n",
    "# Initialize FastAPI app\n",
    "app = FastAPI(\n",
    "    title=\"Agent API\",\n",
    "    description=\"A simple API that serves an AI agent\",\n",
    "    version=\"0.1.0\"\n",
    ")\n",
    "\n",
    "# Create an instance of our agent\n",
    "agent = SimpleAgent()\n",
    "\n",
    "# Health check endpoint\n",
    "@app.get(\"/health\")\n",
    "def health_check():\n",
    "    \"\"\"Check if the API is running\"\"\"\n",
    "    return {\"status\": \"ok\", \"message\": \"API is operational\"}\n",
    "\n",
    "# Create a synchronous endpoint for the agent\n",
    "@app.post(\"/agent\", response_model=QueryResponse)\n",
    "def query_agent(request: QueryRequest):\n",
    "    \"\"\"Get a response from the agent\"\"\"\n",
    "    response = agent.generate_response(request.query)\n",
    "    return QueryResponse(response=response)\n",
    "\n",
    "# Create a streaming endpoint for the agent\n",
    "@app.post(\"/agent/stream\")\n",
    "async def stream_agent(request: QueryRequest):\n",
    "    \"\"\"Stream a response from the agent token by token\"\"\"\n",
    "    \n",
    "    async def event_generator():\n",
    "        async for token in agent.generate_response_stream(request.query):\n",
    "            # Format as a JSON object\n",
    "            data = json.dumps({\"token\": token})\n",
    "            yield f\"data: {data}\\n\\n\"\n",
    "\n",
    "    return StreamingResponse(\n",
    "        event_generator(),\n",
    "        media_type=\"text/event-stream\"\n",
    "    )"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Running the Server\n",
    "\n",
    "Now that we have our FastAPI application, let's run it with uvicorn:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This command should be run in the terminal, not in a notebook cell\n",
    "!cd tutorials/fastapi-agent && uvicorn fastapi_agent:app --reload"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The `--reload` flag enables hot reloading, which automatically restarts the server when you make changes to the code. This is helpful during development.\n",
    "\n",
    "Once running, you can access:\n",
    "- API documentation at http://localhost:8000/docs\n",
    "- Alternative documentation at http://localhost:8000/redoc\n",
    "- Health check endpoint at http://localhost:8000/health"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Simple Client Test\n",
    "\n",
    "Let's test our API with a simple Python client:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Synchronous Response:\n",
      "{'response': \"Agent FastAPI Agent received: 'What is FastAPI?'\\nResponse: This is a simulated agent response.\"}\n",
      "\n",
      "----------------------------------------\n",
      "\n",
      "Streaming Response:\n",
      "Agent FastAPI Agent thinking about: 'Tell me about streaming'\n",
      "This is a simulated agent response that streams token by token. "
     ]
    }
   ],
   "source": [
    "import requests\n",
    "import json\n",
    "\n",
    "# Test the synchronous endpoint\n",
    "response = requests.post(\n",
    "    \"http://localhost:8000/agent\", \n",
    "    json={\"query\": \"What is FastAPI?\"}\n",
    ")\n",
    "print(\"Synchronous Response:\")\n",
    "print(response.json())\n",
    "print(\"\\n\" + \"-\" * 40 + \"\\n\")\n",
    "\n",
    "# Test the streaming endpoint\n",
    "response = requests.post(\n",
    "    \"http://localhost:8000/agent/stream\",\n",
    "    json={\"query\": \"Tell me about streaming\"},\n",
    "    stream=True\n",
    ")\n",
    "\n",
    "print(\"Streaming Response:\")\n",
    "for line in response.iter_lines():\n",
    "    if line:\n",
    "        # Parse the SSE format\n",
    "        line = line.decode('utf-8')\n",
    "        if line.startswith('data: '):\n",
    "            data = json.loads(line[6:])\n",
    "            print(data[\"token\"], end=\"\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This script tests both the synchronous and streaming endpoints of our API."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Adding Basic Auth Key (Optional)\n",
    "\n",
    "For production use, you might want to add simple API key authentication. Let's extend our FastAPI application to check for an API key:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "from fastapi import Depends, HTTPException, Header\n",
    "\n",
    "# Function to validate API key\n",
    "async def verify_api_key(x_api_key: str = Header(None)):\n",
    "    \"\"\"Verify the API key provided in the X-API-Key header\"\"\"\n",
    "    # Get the API key from environment variable\n",
    "    api_key = os.environ.get(\"API_KEY\")\n",
    "    \n",
    "    # If no API key is set in the environment, skip validation\n",
    "    if not api_key:\n",
    "        return True\n",
    "    \n",
    "    # If API key is set but not provided in the request, return 401\n",
    "    if not x_api_key:\n",
    "        raise HTTPException(status_code=401, detail=\"API Key is missing\")\n",
    "    \n",
    "    # If API key doesn't match, return 403\n",
    "    if x_api_key != api_key:\n",
    "        raise HTTPException(status_code=403, detail=\"Invalid API Key\")\n",
    "    \n",
    "    return True\n",
    "\n",
    "# Update endpoints to require API key\n",
    "@app.post(\"/agent\", response_model=QueryResponse)\n",
    "def query_agent(request: QueryRequest, authorized: bool = Depends(verify_api_key)):\n",
    "    \"\"\"Get a response from the agent\"\"\"\n",
    "    response = agent.generate_response(request.query)\n",
    "    return QueryResponse(response=response)\n",
    "\n",
    "@app.post(\"/agent/stream\")\n",
    "async def stream_agent(request: QueryRequest, authorized: bool = Depends(verify_api_key)):\n",
    "    \"\"\"Stream a response from the agent token by token\"\"\"\n",
    "    # Same implementation as before"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "With this update, if you set the `API_KEY` environment variable, the API will require a matching key in the `X-API-Key` header for all requests."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Unit Tests\n",
    "\n",
    "Let's create simple unit tests for our FastAPI application using pytest and the FastAPI test client:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from fastapi.testclient import TestClient\n",
    "from scripts.fastapi_agent import app\n",
    "\n",
    "client = TestClient(app)\n",
    "\n",
    "def test_health_check():\n",
    "    \"\"\"Test the health check endpoint\"\"\"\n",
    "    response = client.get(\"/health\")\n",
    "    assert response.status_code == 200\n",
    "    assert response.json()[\"status\"] == \"ok\"\n",
    "\n",
    "def test_agent_endpoint():\n",
    "    \"\"\"Test the synchronous agent endpoint\"\"\"\n",
    "    response = client.post(\n",
    "        \"/agent\",\n",
    "        json={\"query\": \"Test query\"}\n",
    "    )\n",
    "    assert response.status_code == 200\n",
    "    assert \"response\" in response.json()\n",
    "    assert \"Agent\" in response.json()[\"response\"]\n",
    "\n",
    "def test_stream_endpoint():\n",
    "    \"\"\"Test the streaming agent endpoint\"\"\"\n",
    "    with client.stream(\"POST\", \"/agent/stream\", json={\"query\": \"Test query\"}) as response:\n",
    "        assert response.status_code == 200\n",
    "        assert response.headers[\"content-type\"] == \"text/event-stream\"\n",
    "        # Check that we receive at least some content\n",
    "        content = response.iter_content().read()\n",
    "        assert len(content) > 0"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Save these tests in a file named `test_fastapi_agent.py` in your tests directory and run them with pytest:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# This command should be run in the terminal, not in a notebook cell\n",
    "!pytest -xvs tests/test_fastapi_agent.py"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Next Steps\n",
    "\n",
    "Now that you have a basic FastAPI agent service running, here are some ideas for next steps:\n",
    "\n",
    "- **Add more advanced agents**: Replace the simple agent with your production-ready agent\n",
    "- **Implement authentication and rate limiting**: Add more sophisticated authentication and rate limiting for production use\n",
    "- **Add middleware for logging and monitoring**: Implement middleware for request logging and performance monitoring\n",
    "- **Set up deployment**: Deploy your FastAPI application to a production environment using Docker, Kubernetes, or a cloud service\n",
    "- **Implement async database connections**: Add database integrations for storing conversation history or other data\n",
    "- **Add background tasks**: Use FastAPI's background tasks for long-running operations"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Conclusion\n",
    "\n",
    "In this tutorial, we've built a FastAPI application that serves a simple AI agent with both synchronous and streaming endpoints. We've covered the basics of setting up FastAPI, defining Pydantic models for request/response validation, implementing both synchronous and streaming endpoints, and adding simple authentication.\n",
    "\n",
    "FastAPI's combination of performance, automatic documentation, and developer-friendly features makes it an excellent choice for serving AI agents in production. By following the patterns in this tutorial, you can create robust, production-ready APIs for your own AI agents."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": ".venv",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.12.0"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
